package com.k2de.consumer.impl;

import com.k2data.k2de.client.KafkaClient;
import com.k2data.k2de.utils.RandomNumber;
import com.k2de.consumer.config.ConstantsForConsumer;
import com.k2de.consumer.inter.KafkaSenderInter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.ExecutionException;

/**
 * 发往KMX异常区的kafkaProducer.
 *
 * Created by chenjingshuai on 18-1-9.
 */
public class ExceptionRegionSenderImpl implements KafkaSenderInter<String> {
    private static final Logger LOGGER = LoggerFactory.getLogger(ExceptionRegionSenderImpl.class);
    // 异常topic的kafka client.
    private KafkaClient clientForEx;


    @Override
    public void init(String kafkaServerUrl) {
        try {
            clientForEx = new KafkaClient(ConstantsForConsumer.KAFKA_TOPIC_NAME_EXCEPTION, kafkaServerUrl);
        } catch (ExecutionException | InterruptedException e) {
            LOGGER.warn("");
            e.printStackTrace();
        }
    }

    @Override
    public void send(String sendObj) {
        try {
            String messageKey = ConstantsForConsumer.KAFKA_EXCEPTION_KEY_PREFIX + RandomNumber.getRandomNum();
            LOGGER.debug("Send Message(Key:{}, Message:{}) to exceptionRegion.", messageKey, sendObj);

            clientForEx.sendMessageAsync(messageKey, sendObj);

            LOGGER.debug("The process of sending exception message success.");
        } catch (ExecutionException | InterruptedException e) {
            LOGGER.warn("Send Exception Message occurs exception:{}.", e.getMessage());
        }
    }
}
